www.gusucode.com > VC++ P2P下载软件源代码-源码程序 > VC++ P2P下载软件源代码-源码程序\code\client\ConnectionManager.cpp
//Download by http://www.NewXing.com /* * Copyright (C) 2001-2003 Jacek Sieka, j_s@telia.com * * This program is free software; you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation; either version 2 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program; if not, write to the Free Software * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA. */ #include "stdinc.h" #include "DCPlusPlus.h" #include "ConnectionManager.h" #include "DownloadManager.h" #include "UploadManager.h" #include "UserConnection.h" #include "CryptoManager.h" #include "ClientManager.h" #include "QueueManager.h" ConnectionManager* Singleton<ConnectionManager>::instance = NULL; //#define WITH_GETZBLOCK 1 ConnectionManager::ConnectionManager() : shuttingDown(false) { TimerManager::getInstance()->addListener(this); socket.addListener(this); features.push_back("BZList"); #ifdef WITH_GETZBLOCK features.push_back("GetZBlock"); #endif }; /** * Request a connection for downloading. * DownloadManager::addConnection will be called as soon as the connection is ready * for downloading. * @param aUser The user to connect to. */ void ConnectionManager::getDownloadConnection(const User::Ptr& aUser) { dcassert((bool)aUser); ConnectionQueueItem* cqi = NULL; { Lock l(cs); // Check the download pool if(find(downPool.begin(), downPool.end(), aUser) != downPool.end()) { if(find(pendingAdd.begin(), pendingAdd.end(), aUser) == pendingAdd.end()) pendingAdd.push_back(aUser); return; } // See if we're already trying to connect for(ConnectionQueueItem::TimeIter i = pendingDown.begin(); i != pendingDown.end(); ++i) { if(i->first->getUser() == aUser) { return; } } // Check if we have an active download connection already for(ConnectionQueueItem::Iter j = active.begin(); j != active.end(); ++j) { dcassert((*j)->getConnection()); if((*j == aUser) && ((*j)->getConnection()->isSet(UserConnection::FLAG_DOWNLOAD))) return; } // Add it to the pending... cqi = new ConnectionQueueItem(aUser); cqi->setState(ConnectionQueueItem::WAITING); pendingDown.insert(make_pair(cqi, 0)); fire(ConnectionManagerListener::ADDED, cqi); } } void ConnectionManager::putDownloadConnection(UserConnection* aSource, bool reuse /* = false */) { // Pool it for later usage... if(reuse) { aSource->addListener(this); { Lock l(cs); aSource->getCQI()->setState(ConnectionQueueItem::IDLE); dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end()); active.erase(find(active.begin(), active.end(), aSource->getCQI())); downPool.push_back(aSource->getCQI()); } dcdebug("ConnectionManager::putDownloadConnection Pooling reusable connection %p to %s\n", aSource, aSource->getUser()->getNick().c_str()); } else { if(QueueManager::getInstance()->hasDownload(aSource->getCQI()->getUser())) { aSource->removeListeners(); aSource->disconnect(); Lock l(cs); ConnectionQueueItem* cqi = aSource->getCQI(); dcassert(cqi); // Remove the userconnection, don't need it any more dcassert(find(userConnections.begin(), userConnections.end(), aSource) != userConnections.end()); userConnections.erase(find(userConnections.begin(), userConnections.end(), aSource)); pendingDelete.push_back(aSource); cqi->setConnection(NULL); cqi->setState(ConnectionQueueItem::WAITING); dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end()); active.erase(find(active.begin(), active.end(), aSource->getCQI())); pendingDown[cqi] = GET_TICK(); } else { { Lock l(cs); dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end()); active.erase(find(active.begin(), active.end(), aSource->getCQI())); } putConnection(aSource); } } } void ConnectionManager::putUploadConnection(UserConnection* aSource) { { Lock l(cs); dcassert(find(active.begin(), active.end(), aSource->getCQI()) != active.end()); active.erase(find(active.begin(), active.end(), aSource->getCQI())); } putConnection(aSource); } void ConnectionManager::putConnection(UserConnection* aConn) { aConn->removeListeners(); aConn->disconnect(); ConnectionQueueItem* cqi = NULL; { Lock l(cs); cqi = aConn->getCQI(); dcassert(find(userConnections.begin(), userConnections.end(), aConn) != userConnections.end()); userConnections.erase(find(userConnections.begin(), userConnections.end(), aConn)); pendingDelete.push_back(aConn); } if(cqi) { fire(ConnectionManagerListener::REMOVED, cqi); delete cqi; } } void ConnectionManager::onTimerSecond(u_int32_t aTick) { ConnectionQueueItem::List failPassive; ConnectionQueueItem::List connecting; ConnectionQueueItem::List removed; User::List getDown; { Lock l(cs); { for(User::Iter k = pendingAdd.begin(); k != pendingAdd.end(); ++k) { ConnectionQueueItem::Iter i = find(downPool.begin(), downPool.end(), *k); if(i == downPool.end()) { // Hm, connection must have failed before it could be collected... getDown.push_back(*k); } else { ConnectionQueueItem* cqi = *i; downPool.erase(i); dcassert(find(active.begin(), active.end(), cqi) == active.end()); active.push_back(cqi); dcassert(cqi->getConnection()); dcassert(cqi->getConnection()->getCQI() == cqi); cqi->getConnection()->removeListener(this); DownloadManager::getInstance()->addConnection(cqi->getConnection()); } } pendingAdd.clear(); } bool tooMany = ((SETTING(DOWNLOAD_SLOTS) != 0) && DownloadManager::getInstance()->getDownloads() >= SETTING(DOWNLOAD_SLOTS)); bool tooFast = ((SETTING(MAX_DOWNLOAD_SPEED) != 0 && DownloadManager::getInstance()->getAverageSpeed() >= (SETTING(MAX_DOWNLOAD_SPEED)*1024))); bool startDown = !tooMany && !tooFast; int attempts = 0; ConnectionQueueItem::TimeIter i = pendingDown.begin(); while(i != pendingDown.end()) { ConnectionQueueItem* cqi = i->first; dcassert(cqi->getUser()); if(!cqi->getUser()->isOnline()) { // Not online anymore...remove him from the pending... pendingDown.erase(i++); removed.push_back(cqi); continue; } if( ((i->second + 60*1000) < aTick) && (attempts < 2) ) { i->second = aTick; if(!QueueManager::getInstance()->hasDownload(cqi->getUser())) { pendingDown.erase(i++); removed.push_back(cqi); continue; } if(cqi->getUser()->isSet(User::PASSIVE) && (SETTING(CONNECTION_TYPE) != SettingsManager::CONNECTION_ACTIVE)) { pendingDown.erase(i++); failPassive.push_back(cqi); continue; } // Always start high-priority downloads... if(!startDown) { startDown = QueueManager::getInstance()->hasDownload(cqi->getUser(), QueueItem::HIGHEST); } if(cqi->getState() == ConnectionQueueItem::WAITING) { if(startDown) { cqi->setState(ConnectionQueueItem::CONNECTING); cqi->getUser()->connect(); fire(ConnectionManagerListener::STATUS_CHANGED, cqi); attempts++; } else { cqi->setState(ConnectionQueueItem::NO_DOWNLOAD_SLOTS); fire(ConnectionManagerListener::FAILED, cqi, STRING(ALL_DOWNLOAD_SLOTS_TAKEN)); } } else if(cqi->getState() == ConnectionQueueItem::NO_DOWNLOAD_SLOTS && startDown) { cqi->setState(ConnectionQueueItem::WAITING); } } else if(((i->second + 50*1000) < aTick) && (cqi->getState() == ConnectionQueueItem::CONNECTING)) { fire(ConnectionManagerListener::FAILED, cqi, STRING(CONNECTION_TIMEOUT)); cqi->setState(ConnectionQueueItem::WAITING); } ++i; } } ConnectionQueueItem::Iter m; for(m = removed.begin(); m != removed.end(); ++m) { fire(ConnectionManagerListener::REMOVED, *m); delete *m; } for(m = failPassive.begin(); m != failPassive.end(); ++m) { QueueManager::getInstance()->removeSources((*m)->getUser(), QueueItem::Source::FLAG_PASSIVE); fire(ConnectionManagerListener::REMOVED, *m); delete *m; } for(User::Iter n = getDown.begin(); n != getDown.end(); ++n) { getDownloadConnection(*n); } } void ConnectionManager::onTimerMinute(u_int32_t aTick) { Lock l(cs); { for(UserConnection::Iter j = userConnections.begin(); j != userConnections.end(); ++j) { if(((*j)->getLastActivity() + 180*1000) < aTick) { (*j)->disconnect(); } } } for_each(pendingDelete.begin(), pendingDelete.end(), DeleteFunction<UserConnection*>()); pendingDelete.clear(); } /** * Someone's connecting, accept the connection and wait for identification... * It's always the other fellow that starts sending if he made the connection. */ void ConnectionManager::onIncomingConnection() throw() { UserConnection* uc = NULL; try { uc = getConnection(); uc->setFlag(UserConnection::FLAG_INCOMING); uc->setState(UserConnection::STATE_NICK); uc->setLastActivity(GET_TICK()); uc->accept(socket); } catch(const SocketException& e) { dcdebug("ConnectionManager::OnIncomingConnection caught: %s\n", e.getError().c_str()); if(uc) putConnection(uc); } } void ConnectionManager::connect(const string& aServer, short aPort, const string& aNick) { if(shuttingDown) return; UserConnection* uc = NULL; try { uc = getConnection(); uc->setNick(aNick); uc->setState(UserConnection::STATE_CONNECT); uc->connect(aServer, aPort); } catch(const SocketException&) { if(uc) putConnection(uc); } } void ConnectionManager::onConnected(UserConnection* aSource) throw() { dcassert(aSource->getState() == UserConnection::STATE_CONNECT); aSource->myNick(aSource->getNick()); aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk()); aSource->setState(UserConnection::STATE_NICK); } /** * Nick received. If it's a downloader, fine, otherwise it must be an uploader. */ void ConnectionManager::onMyNick(UserConnection* aSource, const string& aNick) throw() { if(aSource->getState() != UserConnection::STATE_NICK) { // Already got this once, ignore... dcdebug("CM::onMyNick %p sent nick twice\n", aSource); return; } dcassert(aNick.size() > 0); dcdebug("ConnectionManager::onMyNick %p, %s\n", aSource, aNick.c_str()); dcassert(!aSource->getUser()); // First, we try looking in the pending downloads...hopefully it's one of them... { Lock l(cs); for(ConnectionQueueItem::TimeIter i = pendingDown.begin(); i != pendingDown.end(); ++i) { if(i->first->getUser()->getNick() == aNick) { aSource->setUser(i->first->getUser()); // Indicate that we're interested in this file... aSource->setFlag(UserConnection::FLAG_DOWNLOAD); } } } if(!aSource->getUser()) { // Make sure we know who it is, i e that he/she is connected... if(!ClientManager::getInstance()->isOnline(aNick)) { dcdebug("CM::onMyNick Incoming connection from unknown user %s\n", aNick.c_str()); putConnection(aSource); return; } aSource->setUser(ClientManager::getInstance()->getUser(aNick)); // We don't need this connection for downloading...make it an upload connection instead... aSource->setFlag(UserConnection::FLAG_UPLOAD); } if( aSource->isSet(UserConnection::FLAG_INCOMING) ) { aSource->myNick(aSource->getUser()->getClientNick()); aSource->lock(CryptoManager::getInstance()->getLock(), CryptoManager::getInstance()->getPk()); } aSource->setState(UserConnection::STATE_LOCK); } void ConnectionManager::onLock(UserConnection* aSource, const string& aLock, const string& aPk) throw() { if(aSource->getState() != UserConnection::STATE_LOCK) { dcdebug("CM::onLock %p received lock twice, ignoring\n", aSource); return; } if( CryptoManager::getInstance()->isExtended(aLock) ) { // Alright, we have an extended protocol, set a user flag for this user and refresh his info... if( (aPk.find("DCPLUSPLUS") != string::npos) && aSource->getUser()) { aSource->getUser()->setFlag(User::DCPLUSPLUS); User::updated(aSource->getUser()); } aSource->supports(features); } aSource->setState(UserConnection::STATE_DIRECTION); aSource->direction(aSource->getDirectionString(), aSource->getNumber()); aSource->key(CryptoManager::getInstance()->makeKey(aLock)); } void ConnectionManager::onDirection(UserConnection* aSource, const string& dir, const string& num) throw() { if(aSource->getState() != UserConnection::STATE_DIRECTION) { dcdebug("CM::onDirection %p received direction twice, ignoring\n", aSource); return; } dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD)); if(dir == "Upload") { // Fine, the other fellow want's to send us data...make sure we really want that... if(aSource->isSet(UserConnection::FLAG_UPLOAD)) { // Huh? Strange...disconnect... putConnection(aSource); return; } } else { if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) { int number = Util::toInt(num); // Damn, both want to download...the one with the highest number wins... if(aSource->getNumber() < number) { // Damn! We lost! aSource->unsetFlag(UserConnection::FLAG_DOWNLOAD); aSource->setFlag(UserConnection::FLAG_UPLOAD); } else if(aSource->getNumber() == number) { putConnection(aSource); return; } } } dcassert(aSource->isSet(UserConnection::FLAG_DOWNLOAD) ^ aSource->isSet(UserConnection::FLAG_UPLOAD)); aSource->setState(UserConnection::STATE_KEY); } void ConnectionManager::onKey(UserConnection* aSource, const string&/* aKey*/) throw() { if(aSource->getState() != UserConnection::STATE_KEY) { dcdebug("CM::onKey Bad state, ignoring"); return; } // We don't want any messages while the Up/DownloadManagers are working... aSource->removeListener(this); dcassert(aSource->getUser()); { Lock l(cs); // Only one connection / user & direction... for(ConnectionQueueItem::Iter k = active.begin(); k != active.end(); ++k) { bool sameDirection = (*k)->getConnection()->isSet(UserConnection::FLAG_UPLOAD) == aSource->isSet(UserConnection::FLAG_UPLOAD); if( sameDirection && (*k == aSource->getUser()) ) { putConnection(aSource); return; } } ConnectionQueueItem* cqi = NULL; if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) { // See if we have a matching user in the pending connections... ConnectionQueueItem::TimeIter i; for(i = pendingDown.begin(); i != pendingDown.end(); ++i) { if(i->first->getUser() == aSource->getUser()) break; } if(i == pendingDown.end()) { putConnection(aSource); return; } cqi = i->first; pendingDown.erase(i); cqi->setConnection(aSource); } else { dcassert(aSource->isSet(UserConnection::FLAG_UPLOAD)); cqi = new ConnectionQueueItem(aSource->getUser()); cqi->setConnection(aSource); fire(ConnectionManagerListener::ADDED, cqi); } aSource->setCQI(cqi); dcassert(find(active.begin(), active.end(), cqi) == active.end()); active.push_back(cqi); fire(ConnectionManagerListener::CONNECTED, cqi); if(aSource->isSet(UserConnection::FLAG_DOWNLOAD)) { dcdebug("ConnectionManager::onKey, leaving to downloadmanager\n"); DownloadManager::getInstance()->addConnection(aSource); } else { dcassert(aSource->isSet(UserConnection::FLAG_UPLOAD)); dcdebug("ConnectionManager::onKey, leaving to uploadmanager\n"); UploadManager::getInstance()->addConnection(aSource); } } } void ConnectionManager::onFailed(UserConnection* aSource, const string& /*aError*/) throw() { if(aSource->isSet(UserConnection::FLAG_DOWNLOAD) && aSource->getCQI()) { { Lock l(cs); for(ConnectionQueueItem::Iter i = downPool.begin(); i != downPool.end(); ++i) { dcassert((*i)->getConnection()); if((*i)->getConnection() == aSource) { dcdebug("ConnectionManager::onError Removing connection %p to %s from active pool\n", aSource, aSource->getUser()->getNick().c_str()); downPool.erase(i); break; } } } } putConnection(aSource); } void ConnectionManager::removeConnection(const User::Ptr& aUser, int isDownload) { { Lock l(cs); for(UserConnection::Iter i = userConnections.begin(); i != userConnections.end(); ++i) { UserConnection* uc = *i; if(uc->getUser() == aUser && uc->isSet(isDownload ? UserConnection::FLAG_DOWNLOAD : UserConnection::FLAG_UPLOAD)) { uc->disconnect(); break; } } } } void ConnectionManager::shutdown() { shuttingDown = true; socket.removeListener(this); socket.disconnect(); { Lock l(cs); for(UserConnection::Iter j = userConnections.begin(); j != userConnections.end(); ++j) { (*j)->disconnect(); } } // Wait until all connections have died out... while(true) { { Lock l(cs); if(userConnections.empty()) { break; } } Thread::sleep(50); } } // ServerSocketListener void ConnectionManager::onAction(ServerSocketListener::Types type) throw() { switch(type) { case ServerSocketListener::INCOMING_CONNECTION: onIncomingConnection(); } } // UserConnectionListener void ConnectionManager::onAction(UserConnectionListener::Types type, UserConnection* conn) throw() { switch(type) { case UserConnectionListener::CONNECTED: onConnected(conn); break; default: break; } } void ConnectionManager::onAction(UserConnectionListener::Types type, UserConnection* conn, const string& line) throw() { switch(type) { case UserConnectionListener::MY_NICK: onMyNick(conn, line); break; case UserConnectionListener::KEY: onKey(conn, line); break; case UserConnectionListener::FAILED: onFailed(conn, line); break; default: break; } } void ConnectionManager::onAction(UserConnectionListener::Types type, UserConnection* conn, const string& line1, const string& line2) throw() { switch(type) { case UserConnectionListener::C_LOCK: onLock(conn, line1, line2); break; case UserConnectionListener::DIRECTION: onDirection(conn, line1, line2); break; default: break; } } // UserConnectionListener void ConnectionManager::onAction(UserConnectionListener::Types type, UserConnection* conn, const StringList& feat) throw() { switch(type) { case UserConnectionListener::SUPPORTS: { for(StringList::const_iterator i = feat.begin(); i != feat.end(); ++i) { if(*i == "BZList") conn->setFlag(UserConnection::FLAG_SUPPORTS_BZLIST); #ifdef WITH_GETZBLOCK else if(*i == "GetZBlock") conn->setFlag(UserConnection::FLAG_SUPPORTS_GETZBLOCK); #endif } } break; default: break; } } // TimerManagerListener void ConnectionManager::onAction(TimerManagerListener::Types type, u_int32_t aTick) throw() { switch(type) { case TimerManagerListener::SECOND: onTimerSecond(aTick); break; case TimerManagerListener::MINUTE: onTimerMinute(aTick); break; } } /** * @file * $Id: ConnectionManager.cpp,v 1.61 2003/07/15 14:53:10 arnetheduck Exp $ */